package com.heima.kafka.sample;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // kafka连接配置信息
        Properties props = new Properties();
        // kafka连接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        // key和value的序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // ack配置 消息确认机制
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        // 重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 10);

        // 消息压缩
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

        // 创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // 发送消息
        /**
         * 第一个参数：topic
         * 第二个参数：消息的key
         * 第三个参数：消息的value
         */
        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("itcast-topic-input", "hello kafka");
            producer.send(kvProducerRecord);
        }
        // 同步发送消息
        // RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
        // System.out.println(recordMetadata.offset());

        // 异步消息发送
        /*producer.send(kvProducerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if(e != null) {
                    System.out.println("记录异常信息到日志表中");
                }
                System.out.println(recordMetadata.offset());
            }
        });

        // 关闭消息通道（必须要关闭，否则消息发送不成功）
        producer.close();*/
    }
}
